Solutions/Mimecast/Data Connectors/MimecastSEG/MimecastCG/mimecast_cg_to_sentinel.py (555 lines of code) (raw):
"""Get mimecast cg data and ingest to custom table in sentinel."""
import inspect
import json
import time
from random import randrange
import gzip
import aiohttp
import asyncio
from aiohttp.client_exceptions import (
ClientError,
ServerTimeoutError,
ClientResponseError,
)
from ..SharedCode import consts
from ..SharedCode.mimecast_exception import MimecastException, MimecastTimeoutException
from ..SharedCode.logger import applogger
from ..SharedCode.state_manager import StateManager
from ..SharedCode.utils import Utils
from ..SharedCode.sentinel import post_data_async
from tenacity import RetryError
class MimecastCGToSentinel(Utils):
"""Class for ingest cg the data from mimecast to sentinel."""
def __init__(self, start_time) -> None:
"""Initialize MimecastDLPToSentinel object."""
super().__init__(consts.SEG_CG_FUNCTION_NAME)
self.check_environment_var_exist(
[
{"Base_Url": consts.BASE_URL},
{"WorkspaceID": consts.WORKSPACE_ID},
{"WorkspaceKey": consts.WORKSPACE_KEY},
{"Mimecast_Client_ID": consts.MIMECAST_CLIENT_ID},
{"Mimecast_Client_Secret": consts.MIMECAST_CLIENT_SECRET},
]
)
self.authenticate_mimecast_api()
self.start = start_time
self.checkpoint_obj = StateManager(
consts.CONN_STRING, "Checkpoint-SEG-CG", consts.FILE_SHARE_NAME
)
async def get_mimecast_cg_data_in_sentinel(self):
"""Get mimecast cg data and ingest data to sentinel, initialization method."""
__method_name = inspect.currentframe().f_code.co_name
try:
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Start fetching cg endpoint data using batch and async",
)
)
await self.get_batch_data_urls_from_api()
except MimecastTimeoutException:
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Mimecast: 9:00 mins executed hence breaking.",
)
)
return
except MimecastException:
raise MimecastException()
except Exception as err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.UNEXPECTED_ERROR_MSG.format(err),
)
)
raise MimecastException()
async def get_batch_data_urls_from_api(self):
"""Retrieve a list of URLs from the Mimecast CG API and processes them.
This function retrieve a list of URLs from the Mimecast CG API by making a GET request to the
SEG_CG endpoint. It iterate through the response pages and retrieves the URLs from each page.
The function then process the URLs and ingest data in sentinel by calling the `process_s3_bucket_urls` method.
"""
__method_name = inspect.currentframe().f_code.co_name
try:
checkpoint_data = self.get_checkpoint_data(self.checkpoint_obj)
next_page = None
if checkpoint_data:
next_page = checkpoint_data.get("nextPage")
else:
checkpoint_data = {}
url = "{}{}".format(consts.BASE_URL, consts.ENDPOINTS["SEG_CG"])
params = {"type": consts.SEG_CG_TYPES, "pageSize": consts.ASYNC_PAGE_SIZE}
page = 1
while True:
if int(time.time()) >= self.start + consts.FUNCTION_APP_TIMEOUT_SECONDS:
raise MimecastTimeoutException()
if next_page:
params["nextPage"] = next_page
applogger.debug(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Params = {}, url = {}, page {}".format(params, url, page),
)
)
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Iterating page {}".format(page),
)
)
response = self.make_rest_call(method="GET", url=url, params=params)
next_page = response.get("@nextPage")
values = response.get("value")
if not values:
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"No more data to fetch",
)
)
break
url_list = [val.get("url") for val in values]
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Found {} urls in response in page {}".format(
len(url_list), page
),
)
)
result = await self.process_s3_bucket_urls(url_list, page)
applogger.debug(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Next token = {}".format(next_page),
)
)
if result:
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Complete processing s3 bucket urls for page {}".format(
page
),
)
)
checkpoint_data.update({"nextPage": next_page})
self.post_checkpoint_data(self.checkpoint_obj, checkpoint_data)
else:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"An error occurred while fetching data,"
"Please ensure that the Sentinel credentials are correct",
)
)
raise MimecastException()
page += 1
except MimecastTimeoutException:
raise MimecastTimeoutException()
except RetryError as error:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.MAX_RETRY_ERROR_MSG.format(
error, error.last_attempt.exception()
),
)
)
raise MimecastException()
except MimecastException:
raise MimecastException()
except Exception as err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.UNEXPECTED_ERROR_MSG.format(err),
)
)
raise MimecastException()
async def process_s3_bucket_urls(self, url_list, page):
"""Process a list of S3 bucket URLs.
Args:
url_list (List[str]): A list of S3 bucket URLs.
page (int): page number
Returns:
bool: True if all tasks are completed.
"""
__method_name = inspect.currentframe().f_code.co_name
try:
async with aiohttp.ClientSession() as session:
tasks = []
for index, url in enumerate(url_list):
task = asyncio.create_task(
self.fetch_unzip_and_ingest_s3_url_data(index + 1, session, url)
)
tasks.append(task)
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"{} tasks created for page {}".format(len(tasks), page),
)
)
results = await asyncio.gather(*tasks, return_exceptions=True)
success_count = 0
for result in results:
if result is True:
success_count += 1
if success_count == 0 and len(url_list) > 0:
return False
if success_count == len(url_list):
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"All tasks are completed successfully for page {}".format(page),
)
)
else:
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"{} tasks failed for page {}".format(
(len(url_list) - success_count), page
),
)
)
return True
except MimecastException:
raise MimecastException()
except aiohttp.ClientError as session_err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.CLIENT_ERROR_MSG.format(
"Error creating aiohttp.ClientSession: {} for page {}".format(
session_err, page
)
),
)
)
raise MimecastException()
except Exception as err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.UNEXPECTED_ERROR_MSG.format(
"{} for page {}".format(err, page)
),
)
)
raise MimecastException()
def handle_corrupt_data(self, index, obj, corrupt_data):
"""Handle corrupt data by appending it to the corrupt_data list.
Args:
index (int): The index of the task.
obj: The object to be handled.
corrupt_data (list): A list to store corrupt data.
"""
__method_name = inspect.currentframe().f_code.co_name
try:
corrupt_data.append(str(obj))
except TypeError as err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.TYPE_ERROR_MSG.format(
"{}, for task = {}".format(err, index)
),
)
)
async def decompress_and_make_json(self, index, response):
"""Decompress and convert the content of a response to a list of JSON objects.
Args:
index (int): The task index.
response (aiohttp.ClientResponse): The response object.
Returns:
list: A list of JSON objects.
"""
__method_name = inspect.currentframe().f_code.co_name
try:
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Read zip, Decompress zip and make json from events for task {}".format(
index
),
)
)
gzipped_content = await response.read()
decompressed_data = gzip.decompress(gzipped_content)
decompressed_content = decompressed_data.decode("utf-8", errors="replace")
json_objects = []
corrupt_data = []
for obj in decompressed_content.splitlines():
try:
obj = obj.strip()
if obj:
json_objects.append(json.loads(obj))
except json.JSONDecodeError:
self.handle_corrupt_data(index, obj, corrupt_data)
continue
if corrupt_data:
curent_corrupt_data_obj = StateManager(
consts.CONN_STRING,
"Corrupt-Data-Cloud-Gateway_{}".format(str(int(time.time()))),
consts.FILE_SHARE_NAME,
)
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Posting corrupted data into checkpoint file for task: {}".format(
index
),
)
)
self.post_checkpoint_data(curent_corrupt_data_obj, corrupt_data)
return json_objects
except MimecastException:
raise MimecastException()
except aiohttp.ClientError as err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.CLIENT_ERROR_MSG.format(
"Error reading response: {}, for task = {}".format(err, index)
),
)
)
raise MimecastException()
except gzip.BadGzipFile as err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"gzip file is corrupted or Invalid: {}, for task = {}".format(
err, index
),
)
)
raise MimecastException()
except UnicodeDecodeError as err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Error decoding decompressed data: {}, for task = {}".format(
err, index
),
)
)
raise MimecastException()
except (OSError, IOError) as err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Error decompressing data: {}, for task = {}".format(err, index),
)
)
raise MimecastException()
except json.JSONDecodeError as err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.JSON_DECODE_ERROR_MSG.format(
"Error parsing JSON: {}, for task = {}".format(err, index)
),
)
)
raise MimecastException()
except Exception as err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.UNEXPECTED_ERROR_TASK_MSG.format(err, index),
)
)
raise MimecastException()
async def fetch_unzip_and_ingest_s3_url_data(
self, index, session: aiohttp.ClientSession, url
):
"""Fetch, unzip, and ingest data from a given S3 URL.
Args:
index (int): The index of the task.
session (aiohttp.ClientSession): The session to use for making the HTTP request.
url (str): The URL of the S3 file.
Returns:
bool: True if the data was successfully ingested, False otherwise.
"""
__method_name = inspect.currentframe().f_code.co_name
try:
for _ in range(consts.MAX_RETRIES_ASYNC):
try:
response = await self.make_async_call(session, url, index)
response_json = await self.decompress_and_make_json(index, response)
if len(response_json) > 0:
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Data len = {}, Ingesting data to sentinel for task = {}".format(
len(response_json), index
),
)
)
mapping_dict = consts.FILE_PREFIX_MC_TYPE
for data in response_json:
data["type"] = mapping_dict.get(data.get("type"))
await post_data_async(
index,
json.dumps(response_json),
session,
consts.TABLE_NAME["SEG_CG"],
)
return True
return False
except MimecastException:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Retry.. , for task = {}".format(index),
)
)
time.sleep(randrange(2, 10))
continue
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Max retries exceeded, for task = {}".format(index),
)
)
raise MimecastException()
except MimecastException:
raise MimecastException()
except Exception as err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.UNEXPECTED_ERROR_TASK_MSG.format(err, index),
)
)
raise MimecastException()
async def make_async_call(self, session, url, index):
"""Make an asynchronous call to the given URL using the provided session.
Args:
session (aiohttp.ClientSession): The session to use for making the HTTP request.
url (str): The URL to make the call to.
index (int): The index of the task.
Returns:
aiohttp.ClientResponse: The response object if the call is successful.
"""
__method_name = inspect.currentframe().f_code.co_name
try:
applogger.debug(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Get Call, for task = {}".format(index),
)
)
response = await session.get(url)
if response.status >= 200 and response.status <= 299:
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Success, Status code : {} for task = {}".format(
response.status, index
),
)
)
return response
elif response.status == 429:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Too Many Requests, Status code : {} for task = {}".format(
response.status, index
),
)
)
raise MimecastException()
else:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Unexpected Error = {}, Status code : {} for task = {}".format(
response.text, response.status, index
),
)
)
raise MimecastException()
except MimecastException:
raise MimecastException()
except ClientResponseError as err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Client response error: {} - {}, for task = {}".format(
err.status, err.message, index
),
)
)
raise MimecastException()
except ServerTimeoutError as err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Server timeout error: {}, for task = {}".format(err, index),
)
)
raise MimecastException()
except ClientError as err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Client error: {}, for task = {}".format(err, index),
)
)
raise MimecastException()
except asyncio.TimeoutError as err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Request timeout error: {}, for task = {}".format(err, index),
)
)
raise MimecastException()
except Exception as err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.UNEXPECTED_ERROR_TASK_MSG.format(err, index),
)
)
raise MimecastException()